flink1.9 维表 join 自定义实现 LookupableTableSource
流与维表
- 流:源源不断的流数据,一般业务上的流数据都是一些属性id的业务关联信息,如用户id与商品id的关联信息
- 维表:一组不会经常改变的数据,如上面流中用户id对应的用户姓名,性别等信息
流与维表的 join 问题
- 流的速度很快,维表的数据一般是存在第三方存储系统的,如果每一条数据都需要到维表做下join,此时网络交互多次,等待时间会很长。
- 实时访问第三方存储系统做 join,对第三方系统压力很大。
流与维表的 join 方案
Async IO
flink 1.9 未 release 之前,通常的维表 join 实现方案都是采用 Async IO + cache ,这里不做多余的叙述,感兴趣的可以参考
袋鼠云/flinkStreamSQL 。
它的缺点也很明显,Async IO 返回为 DataStream ,注册到 flink 的 table api ,需要自己实现 join 语法的解析,比较复杂。
LookupableTableSource
随着 flink 1.9 的 release,我们可以采用阿里提供的 BlinkPlanner ,只需实现 LookupableTableSource 接口,我们即可将其注册为表,在 table api 进行灵活的 join 。(注意,根据文档 LookupableTableSource 只支持 inner join 和 left join ,且只能作为右表连接)
LookupableTableSource 实现讲解
关于 LookupableTableSource 的说明,这里不再做赘述,此博客讲解的很到位,详情请点击 。 我们在这里就直接上干货吧。
继承 LookupableTableSource
public class DemoLookupableTableSource implements LookupableTableSource<Row> {
private final String[] fieldNames;
private final TypeInformation[] fieldTypes;
private final boolean isAsync;
public DemoLookupableTableSource(String[] fieldNames, TypeInformation[] fieldTypes, boolean isAsync) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.isAsync = isAsync;
}
/**
* 同步方法,这里你需要自定义 TableFuntion 进行实现
* TableFunction 本质是 UDTF,输入一条数据可能返回多条数据,也可能返回一条数据
*
* @param strings a join b on b.id = a.user_id ,此时代表 id
* @return org.apache.flink.table.functions.AsyncTableFunction<org.apache.flink.types.Row>
* @author cuishilei
* @date 2019/9/1
*/
@Override
public TableFunction<Row> getLookupFunction(String[] strings) {
return null;
}
/**
* 异步方法,这里你需要自定义 TableFuntion 进行实现,这里与 异步io 是类似的。
* TableFunction 本质是 UDTF,输入一条数据可能返回多条数据,也可能返回一条数据
*
* @param strings a join b on b.id = a.user_id ,此时代表 id
* @return org.apache.flink.table.functions.AsyncTableFunction<org.apache.flink.types.Row>
* @author cuishilei
* @date 2019/9/1
*/
@Override
public AsyncTableFunction<Row> getAsyncLookupFunction(String[] strings) {
return null;
}
/**
* 声明表的源数据获取方法,如果想使用 getAsyncLookupFunction 则应返回 true
*
* @return boolean
* @author cuishilei
* @date 2019/9/1
*/
@Override
public boolean isAsyncEnabled() {
return isAsync;
}
/**
* 声明表的结构,TableSchema 中含有关于 字段名 字段类型的信息
*
* @return org.apache.flink.table.api.TableSchema
* @author cuishilei
* @date 2019/9/1
*/
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes))
.build();
}
/**
* 声明表的返回列信息,此处没有深究如果写的比 TableSchema 少的话,会返回什么效果,
* 是做过滤用还是必须和 TableSchema 保持一致未知,知道的大佬可以评论和我说一下
*
* @return org.apache.flink.api.common.typeinfo.TypeInformation<org.apache.flink.types.Row>
* @author cuishilei
* @date 2019/9/1
*/
@Override
public TypeInformation<Row> getReturnType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}
}
实现 TableFunction 或 AsyncTableFunction
以下是 flink 官网对 TableFunction 的说明
下面我们先实现 TableFunction
抽象出 AbstractTableFuc
为何先要抽象? 因为实际情况下,维表数据来源可能是多种的,比如 redis hbase 或者 rdb 等,但 TableFunction 中好多方法都是公共定义,就比如 getResultType 声明返回 row 的字段信息等,我们把这些抽象出来,让每个不同的 TableFunction 只专注于从不同的数据源获取数据!
我暂时抽象出来的如下
public abstract class AbstractTableFuc extends TableFunction<Row> {
private static final long serialVersionUID = -1272426765940357679L;
/**
* 返回 row 的字段名
*/
protected final String[] fieldNames;
/**
* 返回 row 的字段类型
*/
protected final TypeInformation[] fieldTypes;
/**
* join key的字段名
*/
protected final String[] joinKeyNames;
protected final boolean isCached;
protected final String cacheType;
protected final long cacheMaxSize;
protected final long cacheExpireMs;
protected final int maxRetryTimes;
/**
* join key的索引
*/
protected int[] joinKeyIndexes;
/**
* 非 join key 的字段
*/
protected String[] otherFieldNames;
/**
* 非 join key 的字段索引
*/
protected int[] otherFieldNamesIndexes;
/**
* 缓存
*/
protected transient Cache<String, Row> cache;
public AbstractTableFuc(String[] fieldNames, TypeInformation[] fieldTypes, String[] joinKeyNames,
boolean isCached, String cacheType, long cacheMaxSize, long cacheExpireMs,
int maxRetryTimes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.joinKeyNames = joinKeyNames;
this.isCached = isCached;
this.cacheType = cacheType;
this.cacheMaxSize = cacheMaxSize;
this.cacheExpireMs = cacheExpireMs;
this.maxRetryTimes = maxRetryTimes;
List<String> fieldNamesList = Arrays.asList(fieldNames);
//找出主键索引
joinKeyIndexes = FieldUtil.getFieldIndexes(fieldNames, joinKeyNames);
//找出非主键的字段的索引
otherFieldNames = Sets.difference(Sets.newHashSet(fieldNames), Sets.newHashSet(joinKeyNames))
.toArray(new String[]{});
otherFieldNamesIndexes = FieldUtil.getFieldIndexes(fieldNames, otherFieldNames);
}
protected void initCache() {
if (isCached) {
if ("ALL".equals(cacheType)) {
//全表缓存用 LoadingCache
cache = Caffeine.newBuilder()
.refreshAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.build(new CacheLoader<String, Row>() {
@Nullable
@Override
public Row load(@NonNull String s) {
return getRow(s);
}
});
} else {
//非全表缓存用 Cache
cache = Caffeine.newBuilder()
.maximumSize(cacheMaxSize)
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.build();
}
}
}
/**
* 返回需要的 row
*
* @param data 定制参数
* @return org.apache.flink.types.Row
* @author cuishilei
* @date 2019/9/1
*/
abstract Row getRow(Object data);
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}
}
实现 redis TableFunction
接下来我们以 redis 为例,实现 redis 的维表数据获取 TableFunction,代码如下:
public class RedisTableFunction extends AbstractTableFuc {
private static final long serialVersionUID = -1408840130246375742L;
private final String ip;
private final int port;
private final String database;
private final String password;
private final String tableName;
private transient RedisClient redisClient;
private transient StatefulRedisConnection<String, String> connection;
private transient RedisStringCommands<String, String> commands;
private RedisTableFunction(Builder builder) {
super(builder.fieldNames, builder.fieldTypes, builder.joinKeyNames,
builder.isCached, builder.cacheType, builder.cacheMaxSize, builder.cacheExpireMs,
builder.maxRetryTimes);
ip = builder.ip;
port = builder.port;
database = builder.database;
password = builder.password;
tableName = builder.tableName;
}
public static Builder newBuilder() {
return new Builder();
}
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
initCache();
redisClient = RedisClient.create("redis://" + ip + ":" + port + "/" + database);
connection = redisClient.connect();
commands = connection.sync();
}
@Override
Row getRow(Object redisKeyPre) {
Object[] filedValue = new Object[fieldNames.length];
String[] redisKeys = FieldUtil.getRedisKeys((String) redisKeyPre, otherFieldNames);
//填充字段值
List<KeyValue<String, String>> mget = commands.mget(redisKeys);
for (int i = 0; i < mget.size(); i++) {
String value = mget.get(i).getValue();
if (value == null) {
throw new RuntimeException("table or column not found");
}
filedValue[otherFieldNamesIndexes[i]] = value;
}
//填充 key 值
List<String> list = Arrays.asList(redisKeyPre.toString().split(":"));
for (int i = 0; i < joinKeyIndexes.length; i++) {
filedValue[joinKeyIndexes[i]] = list.get(list.indexOf(joinKeyNames[i]) + 1);
}
return Row.of(filedValue);
}
public void eval(Object... paramas) {
String keyPre = tableName;
for (int i = 0; i < paramas.length; i++) {
keyPre = keyPre + ":" + (joinKeyNames[i]) + ":" + (paramas[i]);
}
if (isCached) {
if ("ALL".equals(cacheType) && cache instanceof LoadingCache) {
LoadingCache<String, Row> loadingCache = (LoadingCache<String, Row>) cache;
collect(loadingCache.get(keyPre));
} else {
cache.get(keyPre, this::getRow);
}
} else {
Row row = getRow(keyPre);
collect(row);
}
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (redisClient != null) {
redisClient.shutdown();
}
if (cache != null) {
cache = null;
}
}
public static final class Builder {
private String ip;
private int port;
private String database;
private String password;
private String tableName;
private String[] fieldNames;
private TypeInformation[] fieldTypes;
private String[] joinKeyNames;
private boolean isCached;
private String cacheType;
private long cacheMaxSize;
private long cacheExpireMs;
private int maxRetryTimes;
private Builder() {
}
public Builder withIp(String val) {
ip = val;
return this;
}
public Builder withPort(int val) {
port = val;
return this;
}
public Builder withDatabase(String val) {
database = val;
return this;
}
public Builder withPassword(String val) {
password = val;
return this;
}
public Builder withTableName(String val) {
tableName = val;
return this;
}
public Builder withFieldNames(String[] val) {
fieldNames = val;
return this;
}
public Builder withFieldTypes(TypeInformation[] val) {
fieldTypes = val;
return this;
}
public Builder withJoinKeyNames(String[] val) {
joinKeyNames = val;
return this;
}
public Builder withIsCached(boolean val) {
isCached = val;
return this;
}
public Builder withCacheType(String val) {
cacheType = val;
return this;
}
public Builder withCacheMaxSize(long val) {
cacheMaxSize = val;
return this;
}
public Builder withCacheExpireMs(long val) {
cacheExpireMs = val;
return this;
}
public Builder withMaxRetryTimes(int val) {
maxRetryTimes = val;
return this;
}
public RedisTableFunction build() {
return new RedisTableFunction(this);
}
}
}
这里补充一下:redis 维表的存储结构,如下图所示
- userInfo 代表维表名称
- userId 则为 join key,其后紧跟 key 的值,如果有多个则应对应 join on 的顺序,如 a join b on b.id = a.id and b.xx on a.xx 对应为 tableName:id:idValue:xx:xxValue:fieldName
接下来,让我们测试一下
public class LookUpTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020/flink/userClick_Random_100W", "UTF-8");
TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};
String[] fields = new String[]{"id", "user_click", "time"};
RowTypeInfo typeInformation = new RowTypeInfo(types, fields);
DataStream<Row> stream = source.map(new MapFunction<String, Row>() {
private static final long serialVersionUID = 2349572544179673349L;
@Override
public Row map(String s) {
String[] split = s.split(",");
Row row = new Row(split.length);
for (int i = 0; i < split.length; i++) {
Object value = split[i];
if (types[i].equals(Types.STRING)) {
value = split[i];
}
if (types[i].equals(Types.LONG)) {
value = Long.valueOf(split[i]);
}
row.setField(i, value);
}
return row;
}
}).returns(typeInformation);
tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");
RedisLookupTableSource tableSource = getTableSource(false, "ALL", 10000, 30000);
tableEnv.registerTableSource("info", tableSource);
String sql = "select a.id, a.user_click, i.userName, i.userSex" +
" from" +
" user_click_name as a" +
" join info FOR SYSTEM_TIME AS OF a.proctime as i on i.userId = a.id";
Table table = tableEnv.sqlQuery(sql);
DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);
result.print().setParallelism(1);
//sendToKafka(result);
tableEnv.execute("LookUpTest");
}
private static RedisLookupTableSource getTableSource(boolean isCached, String cacheType, long cacheMaxSize, long cacheExpireMs) {
return RedisLookupTableSource.newBuilder()
.withIp("172.16.44.28")
.withPort(6379)
.withDatabase("0")
.withIsCached(isCached)
.withCacheType(cacheType)
.withCacheMaxSize(cacheMaxSize)
.withCacheExpireMs(cacheExpireMs)
.withTableName("userInfo")
.withFieldNames(new String[]{"userId", "userName", "userSex"})
.withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING})
.withIsAsync(false)
.build();
}
private static void sendToKafka(DataStream<Row> result) {
DataStream<String> map = result.map((MapFunction<Row, String>) Row::toString);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "172.16.12.155:9094");
properties.setProperty("retries", "3");
properties.setProperty(KEY_DISABLE_METRICS, "true");
map.addSink(new FlinkKafkaProducer010<>("side_table_test", new SimpleStringSchema(StandardCharsets.UTF_8), properties));
}
}
结果如下:
一些说明
join语法
blink文档中提及维表 join 需要遵循 temporal table join syntax ,
参考文档
https://zhuanlan.zhihu.com/p/79800113
https://github.com/apache/flink/tree/blink
http://fetching118.com/blink_doc/dev/table/sourceSinks.html#defining-a-tablesource-with-lookupable
源码地址
https://github.com/RockeyCui/learn-flink/tree/dev_1.0.0
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 rockeycui@163.com
文章标题:flink1.9 维表 join 自定义实现 LookupableTableSource
文章字数:2.3k
本文作者:崔石磊(RockeyCui)
发布时间:2019-09-01, 09:25:00
原始链接:https://cuishilei.com/flink1.9-维表 join 自定义实现.html版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。